import esphome.codegen as cg
import esphome.config_validation as cv
from esphome.const import (
    CONF_ALL,
    CONF_ANY,
    CONF_AUTOMATION_ID,
    CONF_CONDITION,
    CONF_COUNT,
    CONF_ELSE,
    CONF_ID,
    CONF_THEN,
    CONF_TIME,
    CONF_TIMEOUT,
    CONF_TRIGGER_ID,
    CONF_TYPE_ID,
    CONF_UPDATE_INTERVAL,
)
from esphome.core import ID, Lambda
from esphome.cpp_generator import (
    LambdaExpression,
    MockObj,
    MockObjClass,
    TemplateArgsType,
)
from esphome.schema_extractors import SCHEMA_EXTRACT, schema_extractor
from esphome.types import ConfigType
from esphome.util import Registry


def maybe_simple_id(*validators):
    """Allow a raw ID to be specified in place of a config block.
    If the value that's being validated is a dictionary, it's passed as-is to the specified validators. Otherwise, it's
    wrapped in a dict that looks like ``{"id": <value>}``, and that dict is then handed off to the specified validators.
    """
    return maybe_conf(CONF_ID, *validators)


def maybe_conf(conf, *validators):
    """Allow a raw value to be specified in place of a config block.
    If the value that's being validated is a dictionary, it's passed as-is to the specified validators. Otherwise, it's
    wrapped in a dict that looks like ``{<conf>: <value>}``, and that dict is then handed off to the specified
    validators.
    (This is a general case of ``maybe_simple_id`` that allows the wrapping key to be something other than ``id``.)
    """
    validator = cv.All(*validators)

    @schema_extractor("maybe")
    def validate(value):
        if value == SCHEMA_EXTRACT:
            return (validator, conf)

        if isinstance(value, dict):
            return validator(value)
        with cv.remove_prepend_path([conf]):
            return validator({conf: value})

    return validate


def register_action(name: str, action_type: MockObjClass, schema: cv.Schema):
    return ACTION_REGISTRY.register(name, action_type, schema)


def register_condition(name: str, condition_type: MockObjClass, schema: cv.Schema):
    return CONDITION_REGISTRY.register(name, condition_type, schema)


Action = cg.esphome_ns.class_("Action")
Trigger = cg.esphome_ns.class_("Trigger")
ACTION_REGISTRY = Registry()
Condition = cg.esphome_ns.class_("Condition")
CONDITION_REGISTRY = Registry()
validate_action = cv.validate_registry_entry("action", ACTION_REGISTRY)
validate_action_list = cv.validate_registry("action", ACTION_REGISTRY)
validate_condition = cv.validate_registry_entry("condition", CONDITION_REGISTRY)
validate_condition_list = cv.validate_registry("condition", CONDITION_REGISTRY)


def validate_potentially_and_condition(value):
    if isinstance(value, list):
        with cv.remove_prepend_path(["and"]):
            return validate_condition({"and": value})
    return validate_condition(value)


def validate_potentially_or_condition(value):
    if isinstance(value, list):
        with cv.remove_prepend_path(["or"]):
            return validate_condition({"or": value})
    return validate_condition(value)


DelayAction = cg.esphome_ns.class_("DelayAction", Action, cg.Component)
LambdaAction = cg.esphome_ns.class_("LambdaAction", Action)
StatelessLambdaAction = cg.esphome_ns.class_("StatelessLambdaAction", Action)
IfAction = cg.esphome_ns.class_("IfAction", Action)
WhileAction = cg.esphome_ns.class_("WhileAction", Action)
RepeatAction = cg.esphome_ns.class_("RepeatAction", Action)
WaitUntilAction = cg.esphome_ns.class_("WaitUntilAction", Action, cg.Component)
UpdateComponentAction = cg.esphome_ns.class_("UpdateComponentAction", Action)
SuspendComponentAction = cg.esphome_ns.class_("SuspendComponentAction", Action)
ResumeComponentAction = cg.esphome_ns.class_("ResumeComponentAction", Action)
Automation = cg.esphome_ns.class_("Automation")

LambdaCondition = cg.esphome_ns.class_("LambdaCondition", Condition)
StatelessLambdaCondition = cg.esphome_ns.class_("StatelessLambdaCondition", Condition)
ForCondition = cg.esphome_ns.class_("ForCondition", Condition, cg.Component)


def new_lambda_pvariable(
    id_obj: ID,
    lambda_expr: LambdaExpression,
    stateless_class: MockObjClass,
    template_arg: cg.TemplateArguments | None = None,
) -> MockObj:
    """Create Pvariable for lambda, using stateless class if applicable.

    Combines ID selection and Pvariable creation in one call. For stateless
    lambdas (empty capture), uses function pointer instead of std::function.

    Args:
        id_obj: The ID object (action_id, condition_id, or filter_id)
        lambda_expr: The lambda expression object
        stateless_class: The stateless class to use for stateless lambdas
        template_arg: Optional template arguments (for actions/conditions)

    Returns:
        The created Pvariable
    """
    # For stateless lambdas, use function pointer instead of std::function
    if lambda_expr.capture == "":
        id_obj = id_obj.copy()
        id_obj.type = stateless_class

    if template_arg is not None:
        return cg.new_Pvariable(id_obj, template_arg, lambda_expr)
    return cg.new_Pvariable(id_obj, lambda_expr)


def validate_automation(extra_schema=None, extra_validators=None, single=False):
    if extra_schema is None:
        extra_schema = {}
    if isinstance(extra_schema, cv.Schema):
        extra_schema = extra_schema.schema
    schema = AUTOMATION_SCHEMA.extend(extra_schema)

    def validator_(value):
        if isinstance(value, list):
            # List of items, there are two possible options here, either a sequence of
            # actions (no then:) or a list of automations.
            try:
                # First try as a sequence of actions
                # If that succeeds, return immediately
                with cv.remove_prepend_path([CONF_THEN]):
                    return [schema({CONF_THEN: value})]
            except cv.Invalid as err:
                # Next try as a sequence of automations
                try:
                    return cv.Schema([schema])(value)
                except cv.Invalid as err2:
                    if "extra keys not allowed" in str(err2) and len(err2.path) == 2:
                        # pylint: disable=raise-missing-from
                        raise err
                    if "Unable to find action" in str(err):
                        raise err2
                    raise cv.MultipleInvalid([err, err2])
        elif isinstance(value, dict):
            if CONF_THEN in value:
                return [schema(value)]
            with cv.remove_prepend_path([CONF_THEN]):
                return [schema({CONF_THEN: value})]
        # This should only happen with invalid configs, but let's have a nice error message.
        return [schema(value)]

    @schema_extractor("automation")
    def validator(value):
        if value == SCHEMA_EXTRACT:
            return schema

        value = validator_(value)
        if extra_validators is not None:
            value = cv.Schema([extra_validators])(value)
        if single:
            if len(value) != 1:
                raise cv.Invalid("This trigger allows only a single automation")
            return value[0]
        return value

    return validator


AUTOMATION_SCHEMA = cv.Schema(
    {
        cv.GenerateID(CONF_TRIGGER_ID): cv.declare_id(Trigger),
        cv.GenerateID(CONF_AUTOMATION_ID): cv.declare_id(Automation),
        cv.Required(CONF_THEN): validate_action_list,
    }
)

AndCondition = cg.esphome_ns.class_("AndCondition", Condition)
OrCondition = cg.esphome_ns.class_("OrCondition", Condition)
NotCondition = cg.esphome_ns.class_("NotCondition", Condition)
XorCondition = cg.esphome_ns.class_("XorCondition", Condition)


@register_condition("and", AndCondition, validate_condition_list)
async def and_condition_to_code(
    config: ConfigType,
    condition_id: ID,
    template_arg: cg.TemplateArguments,
    args: TemplateArgsType,
) -> MockObj:
    conditions = await build_condition_list(config, template_arg, args)
    return cg.new_Pvariable(condition_id, template_arg, conditions)


@register_condition("or", OrCondition, validate_condition_list)
async def or_condition_to_code(
    config: ConfigType,
    condition_id: ID,
    template_arg: cg.TemplateArguments,
    args: TemplateArgsType,
) -> MockObj:
    conditions = await build_condition_list(config, template_arg, args)
    return cg.new_Pvariable(condition_id, template_arg, conditions)


@register_condition("all", AndCondition, validate_condition_list)
async def all_condition_to_code(
    config: ConfigType,
    condition_id: ID,
    template_arg: cg.TemplateArguments,
    args: TemplateArgsType,
) -> MockObj:
    conditions = await build_condition_list(config, template_arg, args)
    return cg.new_Pvariable(condition_id, template_arg, conditions)


@register_condition("any", OrCondition, validate_condition_list)
async def any_condition_to_code(
    config: ConfigType,
    condition_id: ID,
    template_arg: cg.TemplateArguments,
    args: TemplateArgsType,
) -> MockObj:
    conditions = await build_condition_list(config, template_arg, args)
    return cg.new_Pvariable(condition_id, template_arg, conditions)


@register_condition("not", NotCondition, validate_potentially_and_condition)
async def not_condition_to_code(
    config: ConfigType,
    condition_id: ID,
    template_arg: cg.TemplateArguments,
    args: TemplateArgsType,
) -> MockObj:
    condition = await build_condition(config, template_arg, args)
    return cg.new_Pvariable(condition_id, template_arg, condition)


@register_condition("xor", XorCondition, validate_condition_list)
async def xor_condition_to_code(
    config: ConfigType,
    condition_id: ID,
    template_arg: cg.TemplateArguments,
    args: TemplateArgsType,
) -> MockObj:
    conditions = await build_condition_list(config, template_arg, args)
    return cg.new_Pvariable(condition_id, template_arg, conditions)


@register_condition("lambda", LambdaCondition, cv.returning_lambda)
async def lambda_condition_to_code(
    config: ConfigType,
    condition_id: ID,
    template_arg: cg.TemplateArguments,
    args: TemplateArgsType,
) -> MockObj:
    lambda_ = await cg.process_lambda(config, args, return_type=bool)
    return new_lambda_pvariable(
        condition_id, lambda_, StatelessLambdaCondition, template_arg
    )


@register_condition(
    "for",
    ForCondition,
    cv.Schema(
        {
            cv.Required(CONF_TIME): cv.templatable(
                cv.positive_time_period_milliseconds
            ),
            cv.Required(CONF_CONDITION): validate_potentially_and_condition,
        }
    ).extend(cv.COMPONENT_SCHEMA),
)
async def for_condition_to_code(
    config: ConfigType,
    condition_id: ID,
    template_arg: cg.TemplateArguments,
    args: TemplateArgsType,
) -> MockObj:
    condition = await build_condition(
        config[CONF_CONDITION], cg.TemplateArguments(), []
    )
    var = cg.new_Pvariable(condition_id, template_arg, condition)
    await cg.register_component(var, config)
    templ = await cg.templatable(config[CONF_TIME], args, cg.uint32)
    cg.add(var.set_time(templ))
    return var


@register_condition(
    "component.is_idle",
    LambdaCondition,
    maybe_simple_id(
        {
            cv.Required(CONF_ID): cv.use_id(cg.Component),
        }
    ),
)
async def component_is_idle_condition_to_code(
    config: ConfigType,
    condition_id: ID,
    template_arg: cg.TemplateArguments,
    args: TemplateArgsType,
) -> MockObj:
    comp = await cg.get_variable(config[CONF_ID])
    lambda_ = await cg.process_lambda(
        Lambda(f"return {comp}->is_idle();"), args, return_type=bool
    )
    return new_lambda_pvariable(
        condition_id, lambda_, StatelessLambdaCondition, template_arg
    )


@register_action(
    "delay", DelayAction, cv.templatable(cv.positive_time_period_milliseconds)
)
async def delay_action_to_code(
    config: ConfigType,
    action_id: ID,
    template_arg: cg.TemplateArguments,
    args: TemplateArgsType,
) -> MockObj:
    var = cg.new_Pvariable(action_id, template_arg)
    await cg.register_component(var, {})
    template_ = await cg.templatable(config, args, cg.uint32)
    cg.add(var.set_delay(template_))
    return var


@register_action(
    "if",
    IfAction,
    cv.All(
        {
            cv.Exclusive(
                CONF_CONDITION, CONF_CONDITION
            ): validate_potentially_and_condition,
            cv.Exclusive(CONF_ANY, CONF_CONDITION): validate_potentially_or_condition,
            cv.Exclusive(CONF_ALL, CONF_CONDITION): validate_potentially_and_condition,
            cv.Optional(CONF_THEN): validate_action_list,
            cv.Optional(CONF_ELSE): validate_action_list,
        },
        cv.has_at_least_one_key(CONF_THEN, CONF_ELSE),
        cv.has_at_least_one_key(CONF_CONDITION, CONF_ANY, CONF_ALL),
    ),
)
async def if_action_to_code(
    config: ConfigType,
    action_id: ID,
    template_arg: cg.TemplateArguments,
    args: TemplateArgsType,
) -> MockObj:
    cond_conf = next(el for el in config if el in (CONF_ANY, CONF_ALL, CONF_CONDITION))
    condition = await build_condition(config[cond_conf], template_arg, args)
    var = cg.new_Pvariable(action_id, template_arg, condition)
    if CONF_THEN in config:
        actions = await build_action_list(config[CONF_THEN], template_arg, args)
        cg.add(var.add_then(actions))
    if CONF_ELSE in config:
        actions = await build_action_list(config[CONF_ELSE], template_arg, args)
        cg.add(var.add_else(actions))
    return var


@register_action(
    "while",
    WhileAction,
    cv.Schema(
        {
            cv.Required(CONF_CONDITION): validate_potentially_and_condition,
            cv.Required(CONF_THEN): validate_action_list,
        }
    ),
)
async def while_action_to_code(
    config: ConfigType,
    action_id: ID,
    template_arg: cg.TemplateArguments,
    args: TemplateArgsType,
) -> MockObj:
    condition = await build_condition(config[CONF_CONDITION], template_arg, args)
    var = cg.new_Pvariable(action_id, template_arg, condition)
    actions = await build_action_list(config[CONF_THEN], template_arg, args)
    cg.add(var.add_then(actions))
    return var


@register_action(
    "repeat",
    RepeatAction,
    cv.Schema(
        {
            cv.Required(CONF_COUNT): cv.templatable(cv.positive_not_null_int),
            cv.Required(CONF_THEN): validate_action_list,
        }
    ),
)
async def repeat_action_to_code(
    config: ConfigType,
    action_id: ID,
    template_arg: cg.TemplateArguments,
    args: TemplateArgsType,
) -> MockObj:
    var = cg.new_Pvariable(action_id, template_arg)
    count_template = await cg.templatable(config[CONF_COUNT], args, cg.uint32)
    cg.add(var.set_count(count_template))
    actions = await build_action_list(
        config[CONF_THEN],
        cg.TemplateArguments(cg.uint32, *template_arg.args),
        [(cg.uint32, "iteration"), *args],
    )
    cg.add(var.add_then(actions))
    return var


_validate_wait_until = cv.maybe_simple_value(
    {
        cv.Required(CONF_CONDITION): validate_potentially_and_condition,
        cv.Optional(CONF_TIMEOUT): cv.templatable(cv.positive_time_period_milliseconds),
    },
    key=CONF_CONDITION,
)


@register_action("wait_until", WaitUntilAction, _validate_wait_until)
async def wait_until_action_to_code(
    config: ConfigType,
    action_id: ID,
    template_arg: cg.TemplateArguments,
    args: TemplateArgsType,
) -> MockObj:
    condition = await build_condition(config[CONF_CONDITION], template_arg, args)
    var = cg.new_Pvariable(action_id, template_arg, condition)
    if CONF_TIMEOUT in config:
        template_ = await cg.templatable(config[CONF_TIMEOUT], args, cg.uint32)
        cg.add(var.set_timeout_value(template_))
    await cg.register_component(var, {})
    return var


@register_action("lambda", LambdaAction, cv.lambda_)
async def lambda_action_to_code(
    config: ConfigType,
    action_id: ID,
    template_arg: cg.TemplateArguments,
    args: TemplateArgsType,
) -> MockObj:
    lambda_ = await cg.process_lambda(config, args, return_type=cg.void)
    return new_lambda_pvariable(action_id, lambda_, StatelessLambdaAction, template_arg)


@register_action(
    "component.update",
    UpdateComponentAction,
    maybe_simple_id(
        {
            cv.Required(CONF_ID): cv.use_id(cg.PollingComponent),
        }
    ),
)
async def component_update_action_to_code(
    config: ConfigType,
    action_id: ID,
    template_arg: cg.TemplateArguments,
    args: TemplateArgsType,
) -> MockObj:
    comp = await cg.get_variable(config[CONF_ID])
    return cg.new_Pvariable(action_id, template_arg, comp)


@register_action(
    "component.suspend",
    SuspendComponentAction,
    maybe_simple_id(
        {
            cv.Required(CONF_ID): cv.use_id(cg.PollingComponent),
        }
    ),
)
async def component_suspend_action_to_code(
    config: ConfigType,
    action_id: ID,
    template_arg: cg.TemplateArguments,
    args: TemplateArgsType,
) -> MockObj:
    comp = await cg.get_variable(config[CONF_ID])
    return cg.new_Pvariable(action_id, template_arg, comp)


@register_action(
    "component.resume",
    ResumeComponentAction,
    maybe_simple_id(
        {
            cv.Required(CONF_ID): cv.use_id(cg.PollingComponent),
            cv.Optional(CONF_UPDATE_INTERVAL): cv.templatable(
                cv.positive_time_period_milliseconds
            ),
        }
    ),
)
async def component_resume_action_to_code(
    config: ConfigType,
    action_id: ID,
    template_arg: cg.TemplateArguments,
    args: TemplateArgsType,
) -> MockObj:
    comp = await cg.get_variable(config[CONF_ID])
    var = cg.new_Pvariable(action_id, template_arg, comp)
    if CONF_UPDATE_INTERVAL in config:
        template_ = await cg.templatable(config[CONF_UPDATE_INTERVAL], args, int)
        cg.add(var.set_update_interval(template_))
    return var


async def build_action(
    full_config: ConfigType, template_arg: cg.TemplateArguments, args: TemplateArgsType
) -> MockObj:
    registry_entry, config = cg.extract_registry_entry_config(
        ACTION_REGISTRY, full_config
    )
    action_id = full_config[CONF_TYPE_ID]
    builder = registry_entry.coroutine_fun
    return await builder(config, action_id, template_arg, args)


async def build_action_list(
    config: list[ConfigType], templ: cg.TemplateArguments, arg_type: TemplateArgsType
) -> list[MockObj]:
    actions: list[MockObj] = []
    for conf in config:
        action = await build_action(conf, templ, arg_type)
        actions.append(action)
    return actions


async def build_condition(
    full_config: ConfigType, template_arg: cg.TemplateArguments, args: TemplateArgsType
) -> MockObj:
    registry_entry, config = cg.extract_registry_entry_config(
        CONDITION_REGISTRY, full_config
    )
    action_id = full_config[CONF_TYPE_ID]
    builder = registry_entry.coroutine_fun
    return await builder(config, action_id, template_arg, args)


async def build_condition_list(
    config: ConfigType, templ: cg.TemplateArguments, args: TemplateArgsType
) -> list[MockObj]:
    conditions: list[MockObj] = []
    for conf in config:
        condition = await build_condition(conf, templ, args)
        conditions.append(condition)
    return conditions


async def build_automation(
    trigger: MockObj, args: TemplateArgsType, config: ConfigType
) -> MockObj:
    arg_types = [arg[0] for arg in args]
    templ = cg.TemplateArguments(*arg_types)
    obj = cg.new_Pvariable(config[CONF_AUTOMATION_ID], templ, trigger)
    actions = await build_action_list(config[CONF_THEN], templ, args)
    cg.add(obj.add_actions(actions))
    return obj
